golang 的 chan select 實在太方便,其實任何提供了協程的語言都能很好且方便的支持 chan 和 select,因爲經常寫 typescript 腳本,於是我把這兩個組件實現到了一個 typescript,你可以直接使用我的庫來得到 chan 和 select,本文後續是實現代碼的分析,你也可以參照分析去任何支持協程的語言中把golang的特性發揚光大
首先我們要分析 chan 提供了哪些功能
所以我們不妨來單獨實現這些功能,最後將它們組合在一起就是完整的 chan 了
首先來實現一個 Reader
class Reader {
// 這個函數用來將值傳遞給 reader 讀取,意思是每當調用這個函數,正在阻塞讀取的 chan 就能得到值
invoke(val: IteratorResult<any>);
// 這個用來 關閉 reader,關閉後才能通知讀取的 協程 退出
close() ;
}
有了Reader.invoke 就能通知阻塞讀取的chan返回了,那 Reader 還需要提供一個接口來讀取值爲此我專門定義了 class ReadValue 代表一個讀取的任務,我們直接看代碼說明
interface Connection {
disconet(): void
}
class ReadValue {
constructor(private readonly p: Reader, private readonly callback: ReadCallback) { }
// 這個函數通知讀取成功,由 Reader 調用
invoke(val: IteratorResult<any>) {
// 這裏調用上層傳來的回調,通知上層(select 或者 chan)讀取到值了
this.callback(val)
}
// 這個函數從 reader 的讀取任務中撤銷註冊
// 記得我們還要實現 select 嗎?select 等待到一個 chan 成功後需要撤銷對其它 case 的讀寫
disconet() {
this.p.disconet(this)
}
}
ReadValue 代表了一個讀取任務(有誰調用了 chan 的 讀取),所以我們 Reader 實現就簡單了,每當上層代碼讀取 chan 我們就創建一個 ReadValue 把它加入到 Reader 的一個隊列中用於等待,當有值可讀了(Reader.invoke 被調用) 就找一個隨機的 ReadValue 去通知它讀取成功了
下面是 Reader 完整代碼
type ReadCallback = (val: IteratorResult<any>) => void
class Reader {
// 記錄 是否被關閉
private closed_ = false
// 保存讀取的任務隊列 (<-chan)
private vals = new Array<ReadValue>()
get isEmpty(): boolean {
return this.vals.length == 0
}
// 有值可讀了就調用這個函數
invoke(val: IteratorResult<any>) {
const vals = this.vals
switch (vals.length) {
case 0: // 如果沒有任務就拋出異常,實現正確的化永遠不會執行到 case 0
throw errChannelReaderEmpty
case 1: // 只有一個等待讀取的,就直接將 val 給它 並且,通知它成功
vals.pop()!.invoke(val)
return
}
// 如果多個讀取任務,就隨機選一個任務 通知它讀取成功
const last = vals.length - 1
const i = Math.floor(Math.random() * vals.length)
if (i != last) { //swap to end // 把隨機選取的任務交互到數組最後面,因爲數組刪除最後一個元素比較快
[vals[i], vals[last]] = [vals[last], vals[i]]
}
vals.pop()!.invoke(val)
}
// 關閉讀取
close() {
if (this.closed_) {
return
}
this.closed_ = true
const vals = this.vals
if (vals.length != 0) { // 通知所有等待中的 chan,已經關閉了快點返回,永遠不會有新的值可以讀到了
for (const val of vals) {
val.invoke(noResult) // 使用了一個 {done:true} 的 js 習慣用法來通知 迭代器 結束了,是的 你後續可以使用 js 的 await for of 來迭代 讀取這個 chan
}
vals.splice(0) // 將任務清空
}
}
// 這裏創建一個讀取任務,讀取成功就調用 回調函數
connect(callback: ReadCallback): ReadValue {
const val = new ReadValue(this, callback)
this.vals.push(val)
return val
}
// 這裏撤銷註冊的任務,主要用於 select 的實現
disconet(val: ReadValue) {
const vals = this.vals
for (let i = 0; i < vals.length; i++) {
if (vals[i] == val) {
vals.splice(i, 1)
break
}
}
}
}
Writer 的邏輯和 Reader 類似先看下最核心的定義
class Writer {
// 調用這個就將一個寫入任務完成,正在執行寫入的 chan 就會返回寫入成功
invoke() ;
// 調用這個關閉寫入,golang 裏面會導致正在執行寫入的 chan 拋出 panic
close();
}
下面來定義一個 WirteValue 來表示一個寫入任務
class WirteValue {
constructor(private readonly p: Writer,
private readonly callback: WriteCallback,
private readonly reject: RejectCallback,
public readonly value: any, // 這個屬性記錄了要寫入的值
) { }
// 調用這個通知寫入成功
invoke() {
this.callback(true)
}
// Writer 關閉時就調用這個,通知上層寫入任務失敗
error() {
const reject = this.reject
if (reject) {
try {
reject(errChannelClosed)
} catch (_) {
}
} else {
this.callback(false)
}
}
// 調用這個就撤銷寫入,同樣是爲了實現 select 準備的
disconet() {
this.p.disconet(this)
}
}
WirteValue 代表了一個寫入任務(有誰調用了 chan 的 寫入),每當上層代碼寫入 chan 我們就創建一個 WirteValue 把它加入到 Writer 的一個隊列中用於等待,當有值可寫了(Writer.invoke 被調用) 就找一個隨機的 WirteValue 去通知它寫入成功了
下面是 Writer 完整代碼
class Writer {
private closed_ = false
// 記錄註冊的寫入任務
private vals = new Array<WirteValue>()
get isEmpty(): boolean {
return this.vals.length == 0
}
// 可寫了就調用這個函數,它就會找一個寫入任務來寫入
invoke() {
const vals = this.vals
switch (vals.length) {
case 0: // 沒有任務,如果實現正確,永遠不會執行到 case 0
throw errChannelWriterEmpty
case 1: // 只有一個任務就直接讓它完成
const p = vals.pop()!
p.invoke()
return p.value // 把要寫的值返回給調用者去寫入
}
// 有多個任務,就隨機選一個去完成
const last = vals.length - 1
const i = Math.floor(Math.random() * vals.length)
if (i != last) { //swap to end //將完成的任務交互到數組末尾以便快速刪除
[vals[i], vals[last]] = [vals[last], vals[i]]
}
const p = vals.pop()!
p.invoke()
return p.value // 把要寫的值返回給調用者去寫入
}
// 關閉
close() {
if (this.closed_) {
return
}
this.closed_ = true
const vals = this.vals
if (vals.length != 0) {
// 通知所有未完成的任務寫入失敗
for (const val of vals) {
val.error()
}
vals.splice(0)
}
}
// 創建一個寫入任務
connect(callback: WriteCallback, reject: RejectCallback, val: any): WirteValue {
const result = new WirteValue(this, callback, reject, val)
this.vals.push(result)
return result
}
// 撤銷任務,爲實現 select 準備
disconet(val: WirteValue) {
const vals = this.vals
for (let i = 0; i < vals.length; i++) {
if (vals[i] == val) {
vals.splice(i, 1)
break
}
}
}
}
發現文章有點長,我將在下篇文章中分析如何先 chan 和 select 的